---
title: MCPAgent
description: "MCPAgent API Documentation"
icon: "bot"
---

# MCPAgent API Reference

## stream

```python
async def stream(
    query: str,
    max_steps: int | None = None,
    manage_connector: bool = True,
    external_history: list[BaseMessage] | None = None,
) -> AsyncGenerator[tuple[AgentAction, str] | str, None]:
```

Stream agent execution step-by-step. Yields intermediate steps as `(AgentAction, str)` tuples, followed by the final result as a string.

**Parameters:**
- `query` (str): The query to execute
- `max_steps` (int, optional): Maximum number of steps to take
- `manage_connector` (bool): Whether to handle connector lifecycle
- `external_history` (list[BaseMessage], optional): External conversation history

**Yields:**
- `(AgentAction, str)`: Intermediate steps containing the action and observation
- `str`: Final result string

**Example:**

```python
async for item in agent.stream("What's the weather like?"):
    if isinstance(item, str):
        print(f"Final result: {item}")
    else:
        action, observation = item
        print(f"Tool: {action.tool}, Result: {observation}")
```

## run

```python
async def run(
    query: str,
    max_steps: int | None = None,
    manage_connector: bool = True,
    external_history: list[BaseMessage] | None = None,
) -> str:
```

Run agent execution and return the final result. Uses the streaming implementation internally.

**Parameters:**
- `query` (str): The query to execute
- `max_steps` (int, optional): Maximum number of steps to take
- `manage_connector` (bool): Whether to handle connector lifecycle
- `external_history` (list[BaseMessage], optional): External conversation history

**Returns:**
- `str`: The final result

**Example:**

```python
result = await agent.run("What's the weather like?")
print(result)
```

## astream

```python
async def astream(
    query: str,
    max_steps: int | None = None,
    manage_connector: bool = True,
    external_history: list[BaseMessage] | None = None,
) -> AsyncIterator[str]:
```

Asynchronous streaming interface for low-level agent events. Yields incremental results, tool actions, and intermediate steps as they are generated by the agent.

**Parameters:**
- `query` (str): The query to execute
- `max_steps` (int, optional): Maximum number of steps to take
- `manage_connector` (bool): Whether to handle connector lifecycle
- `external_history` (list[BaseMessage], optional): External conversation history

**Yields:**
- `str`: Streaming chunks of the agent's output

**Example:**

```python
async for chunk in agent.astream("hello"):
    print(chunk, end="", flush=True)
```

## Method Comparison

| Method | Use Case | Output Type | Granularity |
|--------|----------|-------------|-------------|
| `stream()` | Step-by-step workflow tracking | Steps + final result | Tool-level |
| `run()` | Simple execution | Final result only | Complete |
| `astream()` | Real-time chat interfaces | Streaming chunks | Token-level |

## Configuration Methods

### get_conversation_history

```python
def get_conversation_history() -> list[BaseMessage]:
```

Get the current conversation history.

### clear_conversation_history

```python
def clear_conversation_history() -> None:
```

Clear the conversation history.

### set_system_message

```python
def set_system_message(message: str) -> None:
```

Set a new system message for the agent.

### close

```python
async def close() -> None:
```

Close the MCP connection and clean up resources.
